home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-desktop-9.10-i386-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / queues.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2009-11-11  |  12KB  |  379 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'Queue',
  6.     'SimpleQueue',
  7.     'JoinableQueue']
  8. import sys
  9. import os
  10. import threading
  11. import collections
  12. import time
  13. import atexit
  14. import weakref
  15. from Queue import Empty, Full
  16. import _multiprocessing
  17. from multiprocessing import Pipe
  18. from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
  19. from multiprocessing.util import debug, info, Finalize, register_after_fork
  20. from multiprocessing.forking import assert_spawning
  21.  
  22. class Queue(object):
  23.     
  24.     def __init__(self, maxsize = 0):
  25.         if maxsize <= 0:
  26.             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
  27.         
  28.         self._maxsize = maxsize
  29.         (self._reader, self._writer) = Pipe(duplex = False)
  30.         self._rlock = Lock()
  31.         self._opid = os.getpid()
  32.         if sys.platform == 'win32':
  33.             self._wlock = None
  34.         else:
  35.             self._wlock = Lock()
  36.         self._sem = BoundedSemaphore(maxsize)
  37.         self._after_fork()
  38.         if sys.platform != 'win32':
  39.             register_after_fork(self, Queue._after_fork)
  40.         
  41.  
  42.     
  43.     def __getstate__(self):
  44.         assert_spawning(self)
  45.         return (self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid)
  46.  
  47.     
  48.     def __setstate__(self, state):
  49.         (self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state
  50.         self._after_fork()
  51.  
  52.     
  53.     def _after_fork(self):
  54.         debug('Queue._after_fork()')
  55.         self._notempty = threading.Condition(threading.Lock())
  56.         self._buffer = collections.deque()
  57.         self._thread = None
  58.         self._jointhread = None
  59.         self._joincancelled = False
  60.         self._closed = False
  61.         self._close = None
  62.         self._send = self._writer.send
  63.         self._recv = self._reader.recv
  64.         self._poll = self._reader.poll
  65.  
  66.     
  67.     def put(self, obj, block = True, timeout = None):
  68.         if not not (self._closed):
  69.             raise AssertionError
  70.         if not self._sem.acquire(block, timeout):
  71.             raise Full
  72.         self._sem.acquire(block, timeout)
  73.         self._notempty.acquire()
  74.         
  75.         try:
  76.             if self._thread is None:
  77.                 self._start_thread()
  78.             
  79.             self._buffer.append(obj)
  80.             self._notempty.notify()
  81.         finally:
  82.             self._notempty.release()
  83.  
  84.  
  85.     
  86.     def get(self, block = True, timeout = None):
  87.         if block and timeout is None:
  88.             self._rlock.acquire()
  89.             
  90.             try:
  91.                 res = self._recv()
  92.                 self._sem.release()
  93.                 return res
  94.             finally:
  95.                 self._rlock.release()
  96.  
  97.         elif block:
  98.             deadline = time.time() + timeout
  99.         
  100.         if not self._rlock.acquire(block, timeout):
  101.             raise Empty
  102.         self._rlock.acquire(block, timeout)
  103.         
  104.         try:
  105.             if not block or deadline - time.time():
  106.                 pass
  107.             if not self._poll(0):
  108.                 raise Empty
  109.             self._poll(0)
  110.             res = self._recv()
  111.             self._sem.release()
  112.             return res
  113.         finally:
  114.             self._rlock.release()
  115.  
  116.  
  117.     
  118.     def qsize(self):
  119.         return self._maxsize - self._sem._semlock._get_value()
  120.  
  121.     
  122.     def empty(self):
  123.         return not self._poll()
  124.  
  125.     
  126.     def full(self):
  127.         return self._sem._semlock._is_zero()
  128.  
  129.     
  130.     def get_nowait(self):
  131.         return self.get(False)
  132.  
  133.     
  134.     def put_nowait(self, obj):
  135.         return self.put(obj, False)
  136.  
  137.     
  138.     def close(self):
  139.         self._closed = True
  140.         self._reader.close()
  141.         if self._close:
  142.             self._close()
  143.         
  144.  
  145.     
  146.     def join_thread(self):
  147.         debug('Queue.join_thread()')
  148.         if not self._closed:
  149.             raise AssertionError
  150.         if self._jointhread:
  151.             self._jointhread()
  152.         
  153.  
  154.     
  155.     def cancel_join_thread(self):
  156.         debug('Queue.cancel_join_thread()')
  157.         self._joincancelled = True
  158.         
  159.         try:
  160.             self._jointhread.cancel()
  161.         except AttributeError:
  162.             pass
  163.  
  164.  
  165.     
  166.     def _start_thread(self):
  167.         debug('Queue._start_thread()')
  168.         self._buffer.clear()
  169.         self._thread = threading.Thread(target = Queue._feed, args = (self._buffer, self._notempty, self._send, self._wlock, self._writer.close), name = 'QueueFeederThread')
  170.         self._thread.daemon = True
  171.         debug('doing self._thread.start()')
  172.         self._thread.start()
  173.         debug('... done self._thread.start()')
  174.         created_by_this_process = self._opid == os.getpid()
  175.         if not (self._joincancelled) and not created_by_this_process:
  176.             self._jointhread = Finalize(self._thread, Queue._finalize_join, [
  177.                 weakref.ref(self._thread)], exitpriority = -5)
  178.         
  179.         self._close = Finalize(self, Queue._finalize_close, [
  180.             self._buffer,
  181.             self._notempty], exitpriority = 10)
  182.  
  183.     
  184.     def _finalize_join(twr):
  185.         debug('joining queue thread')
  186.         thread = twr()
  187.         if thread is not None:
  188.             thread.join()
  189.             debug('... queue thread joined')
  190.         else:
  191.             debug('... queue thread already dead')
  192.  
  193.     _finalize_join = staticmethod(_finalize_join)
  194.     
  195.     def _finalize_close(buffer, notempty):
  196.         debug('telling queue thread to quit')
  197.         notempty.acquire()
  198.         
  199.         try:
  200.             buffer.append(_sentinel)
  201.             notempty.notify()
  202.         finally:
  203.             notempty.release()
  204.  
  205.  
  206.     _finalize_close = staticmethod(_finalize_close)
  207.     
  208.     def _feed(buffer, notempty, send, writelock, close):
  209.         debug('starting thread to feed data to pipe')
  210.         is_exiting = is_exiting
  211.         import util
  212.         nacquire = notempty.acquire
  213.         nrelease = notempty.release
  214.         nwait = notempty.wait
  215.         bpopleft = buffer.popleft
  216.         sentinel = _sentinel
  217.         if sys.platform != 'win32':
  218.             wacquire = writelock.acquire
  219.             wrelease = writelock.release
  220.         else:
  221.             wacquire = None
  222.         
  223.         try:
  224.             while None:
  225.                 
  226.                 try:
  227.                     if not buffer:
  228.                         nwait()
  229.                 finally:
  230.                     nrelease()
  231.  
  232.                 
  233.                 try:
  234.                     while None:
  235.                         obj = bpopleft()
  236.                         wacquire()
  237.                         
  238.                         try:
  239.                             send(obj)
  240.                         finally:
  241.                             wrelease()
  242.  
  243.                     continue
  244.                     except IndexError:
  245.                         continue
  246.                     
  247.                 except Exception:
  248.                     e = None
  249.                     
  250.                     try:
  251.                         if is_exiting():
  252.                             info('error in queue thread: %s', e)
  253.                         else:
  254.                             import traceback as traceback
  255.                             traceback.print_exc()
  256.                     except Exception:
  257.                         pass
  258.                     except:
  259.                         None<EXCEPTION MATCH>Exception
  260.                     
  261.  
  262.                     None<EXCEPTION MATCH>Exception
  263.  
  264.                 return None
  265.  
  266.  
  267.     _feed = staticmethod(_feed)
  268.  
  269. _sentinel = object()
  270.  
  271. class JoinableQueue(Queue):
  272.     
  273.     def __init__(self, maxsize = 0):
  274.         Queue.__init__(self, maxsize)
  275.         self._unfinished_tasks = Semaphore(0)
  276.         self._cond = Condition()
  277.  
  278.     
  279.     def __getstate__(self):
  280.         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
  281.  
  282.     
  283.     def __setstate__(self, state):
  284.         Queue.__setstate__(self, state[:-2])
  285.         (self._cond, self._unfinished_tasks) = state[-2:]
  286.  
  287.     
  288.     def put(self, obj, block = True, timeout = None):
  289.         if not not (self._closed):
  290.             raise AssertionError
  291.         if not self._sem.acquire(block, timeout):
  292.             raise Full
  293.         self._sem.acquire(block, timeout)
  294.         self._notempty.acquire()
  295.         self._cond.acquire()
  296.         
  297.         try:
  298.             if self._thread is None:
  299.                 self._start_thread()
  300.             
  301.             self._buffer.append(obj)
  302.             self._unfinished_tasks.release()
  303.             self._notempty.notify()
  304.         finally:
  305.             self._cond.release()
  306.             self._notempty.release()
  307.  
  308.  
  309.     
  310.     def task_done(self):
  311.         self._cond.acquire()
  312.         
  313.         try:
  314.             if not self._unfinished_tasks.acquire(False):
  315.                 raise ValueError('task_done() called too many times')
  316.             self._unfinished_tasks.acquire(False)
  317.             if self._unfinished_tasks._semlock._is_zero():
  318.                 self._cond.notify_all()
  319.         finally:
  320.             self._cond.release()
  321.  
  322.  
  323.     
  324.     def join(self):
  325.         self._cond.acquire()
  326.         
  327.         try:
  328.             if not self._unfinished_tasks._semlock._is_zero():
  329.                 self._cond.wait()
  330.         finally:
  331.             self._cond.release()
  332.  
  333.  
  334.  
  335.  
  336. class SimpleQueue(object):
  337.     
  338.     def __init__(self):
  339.         (self._reader, self._writer) = Pipe(duplex = False)
  340.         self._rlock = Lock()
  341.         if sys.platform == 'win32':
  342.             self._wlock = None
  343.         else:
  344.             self._wlock = Lock()
  345.         self._make_methods()
  346.  
  347.     
  348.     def empty(self):
  349.         return not self._reader.poll()
  350.  
  351.     
  352.     def __getstate__(self):
  353.         assert_spawning(self)
  354.         return (self._reader, self._writer, self._rlock, self._wlock)
  355.  
  356.     
  357.     def __setstate__(self, state):
  358.         (self._reader, self._writer, self._rlock, self._wlock) = state
  359.         self._make_methods()
  360.  
  361.     
  362.     def _make_methods(self):
  363.         recv = self._reader.recv
  364.         racquire = self._rlock.acquire
  365.         rrelease = self._rlock.release
  366.         
  367.         def get():
  368.             racquire()
  369.             
  370.             try:
  371.                 return recv()
  372.             finally:
  373.                 rrelease()
  374.  
  375.  
  376.         self.get = get
  377.  
  378.  
  379.